- 
                Notifications
    
You must be signed in to change notification settings  - Fork 1.3k
 
Fix streaming thinking tags split across multiple chunks #3206
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Fix streaming thinking tags split across multiple chunks #3206
Conversation
fd1d0c2    to
    b04532c      
    Compare
  
    There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dsfaccini Thanks for working on this David. I think we need to test every plausible combination of strings and make sure we never lose text or events.
| start_tag, end_tag = thinking_tags | ||
| 
               | 
          ||
| # Combine any buffered content with the new content | ||
| buffered = self._tag_buffer.get(vendor_part_id, '') if vendor_part_id is not None else '' | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if vendor_part_id is None? Will none of this work anymore? Should we require one for this method to work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe the correct handling in that case would be to assume it's a TextPart
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added one more commit 0818191 to cover and test these cases:
optional-content</think>more-content=>ThinkingPart("...optional-content") + TextPart("more-content")vendor_id is Noneand chunk=<think>start-of-thinking=>ThinkingPart("start-of-thinking")
        
          
                tests/test_parts_manager.py
              
                Outdated
          
        
      | 
               | 
          ||
| # Build up: "text <thi" | ||
| event = manager.handle_text_delta(vendor_part_id='content', content='text <thi', thinking_tags=thinking_tags) | ||
| assert event is None | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the parts manager is never called again after this, we'll lose this text 😬
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this remains a valid concern: if the last chunk happens to be <thi, it will be lost (because it will be buffered but the parts manager won't be called again)
I'll add a test for this and remediate
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just pushed a commit to prevent this, together with new tests adc51e6
| return self.handle_thinking_delta(vendor_part_id=vendor_part_id, content=combined_content) | ||
| else: | ||
| # Not in thinking mode, look for start tag | ||
| if start_tag in combined_content: | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if the model outputs <think> in the middle of its text, inside a code block with XML? We shouldn't treat that as thinking then, just text.
I don't know if there's a good way to prevent that. Previously, we were relying on the (weak) assumption that <think> as a standalone chunk always means the special THINK-START token, whereas <think> in regular text output would (maybe?) be split up over multiple chunks/tokens.
But that was not reliable anyway, as models may also be debouncing their own chunk streaming meaning we'd get multiple tokens at once.
I'm worried about this breaking legitimate XML output though.
Maybe we should only do this at the start of a response, not allowing <think> portions in the middle of text output. And/or leave this off by default and require a ModelProfile setting to opt into it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if the model outputs in the middle of its text, inside a code block with XML? We shouldn't treat that as thinking then, just text.
This was my other concern and secondary reason of why I hadn't created a PR for the issue. I was having trouble determining whether this could happen (whether a <think> could show up in the middle of a response). I've seen information that Claude models can emit <reflection> tags in the middle of a response. But I'm having a hard time finding any concrete references on this.
Though it seems there is a model called "Reflection 70B" which seems to be clearly documented to do this. Though its output seems to be more structured, in that it has distinct <thinking>/<reflection>/<output> tags, so the issue of misinterpreting a <think> tag isn't possible. But yeah, if we have a model specific profile that can handle the parsing for these cases, that would address the issue.
          
 @DouweM thank you for taking the time to review it! I refactored the event handler into returning a generator and am taking into account your comments. And I agree with your comment about the XML case. Specially considering this is a very edge case, and it might even fix itself in the future (by producing chunks properly). I have a question, is it weird that all checks passed even though there may be a lot of breaking stuff in the PR? Does it mean that we should add tests to cover these cases or is there something I may be misunderstanding about the testing/CI process?  | 
    
| 
           I refactored the event handler into returning a generator and am taking into account your comments. @dsfaccini Did you mean to have pushed already? 
 It doesn't break any existing tests since there are none that currently hit the   | 
    
          
 No I didn't mean to push anything yet, I wanted to clear up that question to make sure I was not misunderstanding something about the test suite. Also I didn't wanna push anything else before clearing the XML "case". 
 For now I'll write new tests to cover the edge cases you've pointed out, excluding the possibility (quoted above) for   | 
    
          
 We should also have a test to ensure that in that case, we treat it as regular text!  | 
    
…ering Convert handle_text_delta() from returning a single event to yielding multiple events via a generator pattern. This enables proper handling of thinking tags that may be split across multiple streaming chunks. Key changes: - Convert handle_text_delta() return type from ModelResponseStreamEvent | None to Generator[ModelResponseStreamEvent, None, None] - Add _tag_buffer field to track partial content across chunks - Implement _handle_text_delta_simple() for non-thinking-tag cases - Implement _handle_text_delta_with_thinking_tags() with buffering logic - Add _could_be_tag_start() helper to detect potential split tags - Update all model implementations (10 files) to iterate over events - Adapt test_handle_text_deltas_with_think_tags for generator API Behavior: - Complete thinking tags work at any position (maintains original behavior) - Split thinking tags are buffered when starting at position 0 of chunk - Split tags only work when vendor_part_id is not None (buffering requirement) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
| 
           Hey guys, first of all, obligatory:  @DouweM, this PR got a bit confusing from the back and forth, specially the caveats I overlooked the first time trying to support  constraints we've arrived at through discussionConstraint 1:
 
 Constraint 2:I won't change any existing test aside from adapting to the new return type, I'll create a new test file for this Constraint 3:The new functions in  
 next steps?My question to you is: 
 trajectory of this PR and reasoning for a new onethe current behavior (main branch)
 the problem with the current PR
 this is the restart prompt I usedI reviewed the original, that is the main-branch version of this function: tests/test_parts_manager.py::test_handle_text_deltas_with_think_tags, and noticed that WE changed it. We had to change tests because we're now returning a generator (which is approved in the PR discussion), but we SHOULD NOT have changed the logic of that function. The function clearly shows the following: a chunk arrives with  the summary is: a thinking tag arrives after a text part, but because 1. it arrives in a chunk of its own and 2. it arrives uin full, it is valid our pr wants to support split thinking tags, the question is whether we should support split thinking tags that arrive in any position, and so far the decision is NO: we want to support split thinking tags that arrive, at least, in the first position of their chunk so current (main) behavior won't identify chunk 1:  what we (wrongly) disallowed is for the previous behavior, where a full text chunk can arrive before a thinking tag that is: chunk 1:  what we are explicitly disallowing, for now, is: chunk 1:   | 
    
| 
           @dsfaccini Please force push into this PR so we keep everything in one place!  | 
    
411d969    to
    3439159      
    Compare
  
    …hat look like thinking tags
  Fixes two issues with thinking tag detection in streaming responses:
  1. Support for tags with trailing content in same chunk:
     - START tags: "<think>content" now correctly creates ThinkingPart("content")
     - END tags: "</think>after" now correctly closes thinking and creates TextPart("after")
     - Works for both complete and split tags across chunks
     - Implemented by splitting content at tag boundaries and recursively processing
  2. Fix vendor_part_id=None content routing bug:
     - When vendor_part_id=None and content follows a start tag (e.g., "<think>thinking"),
       content is now routed to the existing ThinkingPart instead of creating a new TextPart
     - Added check in _handle_text_delta_simple to detect existing ThinkingPart
  Implementation:
  - Modified _handle_text_delta_simple to split content at START/END tag boundaries
  - Modified _handle_text_delta_with_thinking_tags with symmetric split logic
  - Added ThinkingPart detection for vendor_part_id=None case (lines 164-168)
  - Kept pragma comments only on architecturally unreachable branches
  Tests added (11 new tests in test_parts_manager_split_tags.py):
    | def get(self) -> ModelResponse: | ||
| """Build a [`ModelResponse`][pydantic_ai.messages.ModelResponse] from the data received from the stream so far.""" | ||
| # Flush any buffered content before building response | ||
| for _ in self._parts_manager.finalize(): | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to call this from StreamedResponse.__aiter__ so that we also stream the event
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added it to __aiter__. I realized calling get mid-stream would empty the buffer, messing up split-tag handling, so I'm addressing that in the next commit by cloning the parts manager in the get method.
        
          
                tests/test_parts_manager.py
              
                Outdated
          
        
      | 
               | 
          ||
| # Add a text part first | ||
| manager.handle_text_delta(vendor_part_id='text', content='hello') | ||
| list(manager.handle_text_delta(vendor_part_id='text', content='hello')) | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need list here and in the other places where we don't use it in a variable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cause the handle_text_delta now produces a generator, if we don't iterate over it it doesn't get evaluated at all
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yeah, this is a bit less clear to me than async for e in ...: pass though
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that makes sense, I'll update it to that
| events = list(manager.handle_text_delta(vendor_part_id='content', content='nk>', thinking_tags=thinking_tags)) | ||
| assert len(events) == 1 | ||
| assert events[0] == snapshot( | ||
| PartStartEvent(index=0, part=ThinkingPart(content='', part_kind='thinking'), event_kind='part_start') | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we're doing more buffering now, could we make it so that we don't yield a ThinkingPart with content='' and only yield once we see actual text? I did it like this before because we were checking the most recent part to see if it was a ThinkingPart, to determine whether we're in "thinking mode", but we can now keep track of that more directly
| ) | ||
| 
               | 
          ||
| # Chunk 2: "<thi" - starts at position 0 of THIS chunk, buffer it | ||
| events = list(manager.handle_text_delta(vendor_part_id='content', content='<thi', thinking_tags=thinking_tags)) | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we decided to only parse <think> tags at the very start, right? So this should be yielded as a text part
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, you're right, I didn't understand very well how chunks pass through the parts manager, looking at this now I think I what I need to specify is: if a text part has been created for a vendor id, disregard any thinking tags that later appear for that vendor id
so I'll use the _vendor_id_to_part_index to track if text parts have been created for a vendor id and "disable" thinking tag recognition in those cases
| 
               | 
          ||
| # Complete start tag with vendor_part_id=None goes through simple path | ||
| # This covers lines 161-164 in _handle_text_delta_simple | ||
| events = list(manager.handle_text_delta(vendor_part_id=None, content='<think>', thinking_tags=thinking_tags)) | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the model only ever outputs <think>, without any text following it, I think that should be a <think> text part
| # Add both empty and non-empty content to test the branch where buffered_content is falsy | ||
| # This ensures the loop continues after skipping the empty content | ||
| manager._thinking_tag_buffer['id1'] = '' # Will be skipped # pyright: ignore[reportPrivateUsage] | ||
| manager._thinking_tag_buffer['id2'] = 'content' # Will be flushed # pyright: ignore[reportPrivateUsage] | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to test private fields?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
coverage, will remove and add back the pragmas
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should be able to get through every code path through public API usage right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, there's not a path to get an empty string into the buffer through the public API, I added the test cause coverage was complaining, but perhaps it was complaining bc I was doing a defensive check. I removed it now and will check if it complains again.
| manager = ModelResponsePartsManager() | ||
| thinking_tags = ('<think>', '</think>') | ||
| 
               | 
          ||
| # Single chunk: "prefix<think>thinking" | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As mentioned above, I think we should only parse <think> tags at the start of text.
Some models will precede it by \n so we should still support it in that case if ignore_leading_whitespace is enabled
| thinking_tags = ('<think>', '</think>') | ||
| 
               | 
          ||
| # With vendor_part_id=None and text before start tag | ||
| events = list(manager.handle_text_delta(vendor_part_id=None, content='text<think>', thinking_tags=thinking_tags)) | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find these tests a bit hard to follow because we have to scan over multiple handle_text_delta calls to see what the scenario is that we're testing.
Could we add a helper that takes a list of text chunks, and returns a tuple of the yielded events + parts? So each of these tests can become helper(['<thi', 'nk>thou', 'ght</t', 'hink>'], vendor_part_id='...') == snapshot()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That way we can more easily scan whether our tests are exhaustive of all possible edge cases.
I'd like a test for something like ['<thi', 'nk>foo</thi', 'nk>'] for example, but it's hard to know right now if we're testing that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should also test what happens when <think>foo</th is followed by a new TextPart with a different vendor_part_id, or a new part entirely (possibly even a ThinkingPart although that'd be weird)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That way we can more easily scan whether our tests are exhaustive of all possible edge cases.
I'd like a test for something like ['<thi', 'nk>foo</thi', 'nk>'] for example, but it's hard to know right now if we're testing that
I'll try to make that clearer, the helper will probably help with that too
We should also test what happens when foo</th is followed by a new TextPart with a different vendor_part_id, or a new part entirely (possibly even a ThinkingPart although that'd be weird)
this is related to your other request
Since we're doing more buffering now, could we make it so that we don't yield a ThinkingPart with content='' and only yield once we see actual text? I did it like this before because we were checking the most recent part to see if it was a ThinkingPart, to determine whether we're in "thinking mode", but we can now keep track of that more directly
- as you said the current behavior of 
mainis to immediately emit theThinkingParteven before encountering content - the current state of the PR is to buffer that 
ThinkingPartuntil we see content - but as soon as we see content we emit a 
PartStartEvent - so we really can't take that back once we've emitted it, right?
 - that means we have to make a choice of whether the handling of split thinking tags will
 
- disregard whether the tag closes OR
 - buffer the whole content after a thinking tag until it closes (naturally emitting as text if it doesn't close)
 
2 sounds like it has more potential for bugginess, at the very least the streaming behavior would be broken, right? because we'd actually be waiting for the whole content before emitting. So I'd go for 1.
one weird edge case if we go for 1 is that we'll emit whatever the content is before the <think> completely closes, so if we get a partial end tag (</th) we'll infer that as part of the content and emit a ThinkingPart(content='thinking foo</th') (which sounds like a good enough trade-off to me, but let me know what you think we should do in that case).
I'll push the latest state that assumes 1 for now.
- fix mistral's event iterator (wasn't iterating over thinking events)
| 
               | 
          ||
| # Flush any buffered content and stream finalize events | ||
| for finalize_event in self._parts_manager.finalize(): | ||
| if isinstance(finalize_event, PartStartEvent): | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there are only two cases when finalize will have an effect: when we're buffering
- a split starting tag, i.e. 
<th→ emits aPartStartEvent - a split ending tag, i.e. 
</th→ emits aPartDeltaEvent
coverage is complaining that there's no test running through thePartDeltaEventbranch of this, so I need to figure out how to test it 
- avoid emptying bufer mid-stream
Fixes #3007
When streaming responses from models like Gemini via LiteLLM, thinking tags can be split across multiple chunks (e.g., chunk 1:
"<thi", chunk 2:"nk>content</think>"). The existing implementation only detected complete tags that arrived as standalone chunks, causing split tags to be treated as regular text instead of being extracted intoThinkingPart.Changes:
ModelResponsePartsManagerto accumulate content when it might be part of a split taghandle_text_delta()to detect complete tags across chunk boundaries while maintaining backward compatibilityEdit: what does this PR do
Constraint 1:
_parts_manager.py::get_parts()returns-> Generator[ModelResponseStreamEvent, None, None]-> ModelResponseStreamEvent | Noneif is Nonechecks withfor event in ...Constraint 2:
_parts_managerConstraint 3:
_parts_manager.pywill buffer chunks when a chunk arrives that looks like it will be a<think>tag<thifoo<thiwill not get buffered, it'll be emitted as aTextPartCases it does (should) and doesn't cover
✅ split thinking tags or full thinking tags
<thi+nk>thinking</th+ink>->ThinkingPart("thinking")<thi+nk>th+inking->ThinkingPart("thinking")<think>thinking->ThinkingPart("thinking")<think>with content, like in the previous examples, should be emitted as aTextPartif the tag isn't closed by the end of the stream. The current behavior (linked in the caveat) emits theThinkingPartas soon as the tag comes along, so it would be consistent to do the same here<think>-> gets buffered until there's content for it<th) and full (<think>) starting tags, if there's no content at the end of the stream, they'll be emitted asTextPartsmainbehavior, which immediately emits aPartStartEventwith an empty thinking part (ThinkingPart("")) by looking at _parts_manager.py❌ thinking tags after a text part
foo<th-> thinking chunk needs to start with something that looks like a thinking partfoo<think>-> the above applies even if the tag is completefoo+<thi+ink>+ ... ->TextPart("foo<think>...")-> all textchecks if text parts have been created for this vendor id
Fix streaming thinking tags split across multiple chunks #3206